
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import sys

reload(sys)
import xlrd
import os

sys.setdefaultencoding('utf-8')
conf = SparkConf().set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext("yarn-client", "test", conf=conf)
sqlContext = HiveContext(sc)
sqlContest = SQLContext(sc)



def eachFile(dir,pt):
    read_file_df = pd.read_excel("""/home/data/yaq_zhengquan/20181026/result/%(dir)s.xlsx""" %{"dir":dir})
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(read_file_df)
    spark_df.registerTempTable("tmp_table" )
    df = sqlContext.sql("""select
                cast(msisdn as string) msisdn
            from
                tmp_table""")
    df.repartition(10).write.mode("append").orc("/user/hive/warehouse/history.db/yaq_zhengquan/pt=%s" % pt)
    sqlContext.sql("alter table history.yaq_zhengquan drop if exists partition (pt=%s)"% pt)
    sqlContext.sql("alter table history.yaq_zhengquan add if not exists partition (pt=%s)"% pt)


sql="""
	select
		t2.msisdn msisdn,
		max(t2.insert_time) insert_time
	from (
		select
			t.msisdn msisdn ,
			t.insert_time insert_time
		from
			(SELECT msisdn,insert_time from dw.zhengquan where insert_time >='2018-05' and smslabel = '兴业证券' )  t
		LEFT OUTER JOIN
			history.yaq_zhengquan t1
		ON t.msisdn = t1.msisdn
		WHERE t1.msisdn is null
	) t2
	group by
		t2.msisdn
	limit 50000
"""
df=sqlContext.sql(sql)
writer = pd.ExcelWriter("/home/data/yaq_zhengquan/20181026/result/20181026.xlsx",engine='xlsxwriter')
df.toPandas().to_excel(writer, sheet_name='Sheet1')
writer.save()
eachFile("20181026","20181026")





# load data local inpath '/home/data/yaq_zhengquan/20181023/01.txt' into table history.yaq_zhengquan partition(pt='20181023');


df=sqlContext.sql("CREATE TABLE dw.zhengquan_20180829 AS SELECT * from dw.sms_log where msg_content like '%股票%' or smslabel like '%证券%'")